package com.pengpeng.rabbitmq.producer.send;

import com.pengpeng.rabbitmq.bean.MqExchange;
import com.pengpeng.rabbitmq.exception.RabbitMQExceptionUtils;
import com.pengpeng.rabbitmq.message.MqMessage;
import com.pengpeng.rabbitmq.utils.UUIDUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;

import javax.annotation.PostConstruct;

/**
 * @author 彭鹏
 * @date 2021/10/19.
 */
public abstract class AbstractSendService  implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    public final Log logger = LogFactory.getLog(this.getClass());
    public final static String DEFAULT_EXCHANGE = "amq.direct";

    @Autowired
    RabbitTemplate rabbitTemplate;

    /**
     * 简单的发送消息
     * 发送的交换机是默认的 amq.direct交换机，该交换机的类型是DIRECT类型，开启持久化机制
     * 发送的队列即为RoutingKey,需要绑定队列时
     *
     * @param queue   队列，默认是跟路由键是相同的
     * @param content 发送的内容
     */
    public void send(String queue, String content) {
        if (StringUtils.isEmpty(queue)) {
            RabbitMQExceptionUtils.throwRabbitMQException("发送的队列不能为空");
        }
        if (StringUtils.isEmpty(content)) {
            RabbitMQExceptionUtils.throwRabbitMQException("内容不能为空");
        }
        this.send(MqExchange.DEFAULT_DIRECT_EXCHANGE, queue, content, null, UUIDUtils.generateUuid());
    }

    /**
     * 发送一条有过期时间的消息
     *
     * @param queue      队列，默认是跟路由键相同的
     * @param content    发送的内容
     * @param expireTime 过期时间 时间毫秒
     */
    public void send(String queue, String content, int expireTime) {
        if (StringUtils.isEmpty(queue)) {
            RabbitMQExceptionUtils.throwRabbitMQException("发送的队列不能为空");
        }
        if (StringUtils.isEmpty(content)) {
            RabbitMQExceptionUtils.throwRabbitMQException("内容不能为空");
        }
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                // 设置消息的过期时间
                message.getMessageProperties().setExpiration(expireTime + "");
                return message;
            }
        };
        this.send(MqExchange.DEFAULT_DIRECT_EXCHANGE, queue, content, messagePostProcessor, UUIDUtils.generateUuid());
    }

    /**
     * 按照给定的交换机、路由键、发送内容、发送的自定义属性来发送消息
     * TODO 待完善交互方式
     *
     * @param exchange             交换机名称
     * @param routingKey           路由键
     * @param object               发送的内容
     * @param messagePostProcessor 发送消息自定义处理
     * @param messageId            消息ID
     */
    public void send(String exchange, String routingKey, Object object, MessagePostProcessor messagePostProcessor, String messageId) {
        if (StringUtils.isEmpty(exchange)) {
            RabbitMQExceptionUtils.throwRabbitMQException("交换机不能为空");
        }

        if (StringUtils.isEmpty(routingKey)) {
            RabbitMQExceptionUtils.throwRabbitMQException("路由键不能为空");
        }

        if (object == null || StringUtils.isEmpty((String)object)) {
            RabbitMQExceptionUtils.throwRabbitMQException("发送的内容不能为空");
        }

        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(StringUtils.isEmpty(messageId) ? UUIDUtils.generateUuid() : messageId);
        MqMessage mqMessage = new MqMessage();
        mqMessage.setMessageBody(object);
        mqMessage.setMessageId(correlationData.getId());
        mqMessage.setExchangeName(exchange);
        mqMessage.setQueueName(routingKey);
        mqMessage.setRoutingKey(routingKey);
        if (messagePostProcessor ==null) {
            this.rabbitTemplate.convertAndSend(exchange, routingKey, mqMessage, correlationData);
        } else {
            // 发送对应的消息
            this.rabbitTemplate.convertAndSend(exchange, routingKey, mqMessage, messagePostProcessor, correlationData);
        }
    }





    /**
     * 默认实现发送确认的处理方法
     * 子类需要重写该方法，实现自己的业务处理逻辑
     *
     * @param messageId 消息
     * @param ack
     * @param cause
     */
    public abstract void handleConfirmCallback(String messageId, boolean ack, String cause);

    /**
     * 默认实现发送匹配不上队列时 回调函数的处理
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param routingKey
     */
    public abstract void handleReturnCallback(Message message, int replyCode, String replyText,
                                              String routingKey);

    /**
     * 交换机如果根据自身的类型和路由键匹配上对应的队列时，是否调用returnCallback回调函数
     * true: 调用returnCallback回调函数
     * false： 不调用returnCallback回调函数 这样在匹配不上对应的队列时，会导致消息丢失
     */
    @Value("${spring.message.mandatory:false}  ")
    private Boolean mandatory;
    /**
     * 默认队列的优先级
     */
    public static final int MESSAGE_PRIORITY = 1;

    @PostConstruct
    public final void init() {
        this.logger.info("sendservice 初始化...... ");

        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnsCallback(this);
    }

    /**
     * 确认后回调方法
     *
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public final void confirm(CorrelationData correlationData, boolean ack, String cause) {
        this.logger.info("confirm-----correlationData:" + correlationData.toString() + "---ack:" + ack + "----cause:" + cause);
        // TODO 记录日志（数据库或者es）
        this.handleConfirmCallback(correlationData.getId(), ack, cause);
    }

    /**
     * 失败后回调方法
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public final void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        this.logger.info("return-----message:" + message.toString() + "---replyCode:" + replyCode + "----replyText:" + replyText + "----exchange:" + exchange + "----routingKey:" + routingKey);
        // TODO 记录日志（数据库或者es）
        this.handleReturnCallback(message, replyCode, replyText, routingKey);
    }
}
